跳到主要内容

Scrapy 学习

scrapy.Request

scrapy.Request 可以传入的参数:

scrapy.Request(url, callback, method='GET', headers, body, cookies, meta, dont_filter=False)

# callback 指定传入的 url 交给哪个解析函数去处理
# meta 实现在不同的解析函数中传递数据,meta 默认会携带部分信息(传入的参数是一个字典)
# dont_filter 让 scrapy 的去重不会过滤当前 url(scrapy 默认会去除重复的请求),用途是去爬一些响应会变的数据

# meta 的用法
yield scrapy.Request(
response.urljoin(next_url),
# 这个回调可以设置成其它的函数,不一定是要调用自己
self.parse02,
meta = {'item':item}
)

def parse02(self, response):
item = response.meta['item']

Request

属性

url
method
headers
body
meta
copy() 复制一个相同的request
replace()

通过Request传递数据的方法(在两个不同的解析函数之间传递数据的方法。)

def parse_page1(self, response):
item = MyItem()
item['main_url'] = response.url
request = scrapy.Request("http://www.example.com/some_page.html",
callback=self.parse_page2)
request.meta['item'] = item
return request

def parse_page2(self, response):
item = response.meta['item']
item['other_url'] = response.url
return item

主要的子类是 FormRequest(可以模拟表单登陆)

# 用于存储用户名,密码等数据
formdata

# 模拟表单或 Ajax 提交 post 请求的时候(默认是 post 请求)
from_response

实例:使用 FormRequest.from_response() 方法模拟用户登录

通常网站通过 实现对某些表单字段(如数据或是登录界面中的认证令牌等)的预填充。 使用 Scrapy 抓取网页时,如果想要预填充或重写像用户名、用户密码这些表单字段, 可以使用 FormRequest.from_response() 方法实现。下面是使用这种方法的爬虫例子:

import scrapy

class LoginSpider(scrapy.Spider):
name = 'example.com'
start_urls = ['http://www.example.com/users/login.php']

# 之所以还需要传递给另一个方法来发送登陆请求是因为默认 parse 发送的是 POST 请求
def parse(self, response):
return scrapy.FormRequest.from_response( # 从 response 返回一个request(FormRequest)
response,
formdata={'username': 'john', 'password': 'secret'},
callback=self.after_login
)
# 或者
# yield FormRequest.from_response( # 从 response 返回一个request(FormRequest)
# response,
# formdata={'username': 'john', 'password': 'secret'},
# callback=self.after_login
# )

def after_login(self, response):
# check login succeed before going on
if "authentication failed" in response.body:
self.logger.error("Login failed")
return

# continue scraping with authenticated session...

如果一开始就想发送 POST 请求可以重写 start_requests(self) 方法,并且不再调用 start_urls 里的 url

class mySpider(scrapy.Spider):
# start_urls = ["http://www.example.com/"]

def start_requests(self):
url = 'http://www.renren.com/PLogin.do'

# FormRequest 是Scrapy发送POST请求的方法
yield scrapy.FormRequest(
url = url,
formdata = {"email" : "xxx", "password" : "xxxxx"},
callback = self.parse_page
)
def parse_page(self, response):
# do something

Response

属性

url
status
headers
body
request # 是产生该 response 的 request
meta # 是 request.meta 的简要形式
flags
copy()
replace()
urljoin() # 用相对连接生成绝对连接。

如果想要获取 js 代码的字段例子:

# 搭配正则表达式找到字段的数据
page_count = re.findall("var pagecount=(.*?);", response.body.decode)[0]

模拟携带 Cookie 登陆 应用场景:

1、cookie 过期时间很长
2、配合其它程序使用,例如使用 selenium 把登陆之后的 cookie 保存本地,scrapy 发送请求前先读取本地 cookie

如果要观察 Cookie 可以在 Setting 里面打开 COOKIES_DEBUG=True

因为默认 Scrapy 会先发送请求,所以如果第一次请求就携带 Cookie 需要重写 Scrapy 的 start_requests(self) 方法

class mySpider(scrapy.Spider):
allowed_domains = ['example.com']
start_urls = ["http://www.example.com/"]

def start_requests(self):
# FormRequest 是Scrapy发送POST请求的方法
# 默认的 cookies 是如下这种字符串
cookies = "a=1;b=2;c=3;"
# 所以需要手动切割字符串(可以直接把 for 写在里面)
cookies = {i.split("=")[0]:i.split("=")[1] for i in cookies.split(";")}
yield scrapy.Request(
self.start_urls[0],
callback=self.parse,
cookies=cookies
)

def parse(self, response):
pass

修改请求头

如果要修改请求头也是在 setting.py 文件里面加,里面有个被注释的 DEFAULT_REQUEST_HEADERS 属性,取消注释然后修改下就好了

DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
}

代理池

免费代理池 kuaidaili 免费代理池 kxdaili 免费代理池 66ip

就算再怎么修改请求头但是每次去请求的都是同一个 ip 依然会给检测出来,所以可以使用 ip 池来更换不同的 ip对服务器进行访问

设置代理的这个需要在下载中间件里配置

添加代理


读取页内例子

读取列表里面的页面的例子

import scrapy
import re
import time
import random


class Study01Spider(scrapy.Spider):
name = 'study01'
allowed_domains = ['ssr1.scrape.center']
start_urls = ['https://ssr1.scrape.center/']
max_time = 2.0
min_time = 0.5

def parse(self, response):

items = response.xpath('//div[@class="el-col el-col-18 el-col-offset-3"]//div[@class="el-row"]')
for it in items:
# 内循环遍历获取类型
categories = it.xpath('.//button/span/text()')
category_list = []
for category in categories:
category_list.append(category.get())

item = {
"image_url": it.xpath('.//img/@src').get(),
"href_url": it.xpath('.//a[@class="name"]/@href').get(),
"name": it.xpath('.//h2/text()').get(),
"categories": category_list,
"country": it.xpath('.//div[@class="m-v-sm info"][1]/span[1]/text()').get(),
"release_time": it.xpath('.//div[@class="m-v-sm info"][2]/span[1]/text()').get(),
"duration": it.xpath('.//div[@class="m-v-sm info"][1]/span[3]/text()').get(),
"grade": float(re.search('[0-9].?[0-9]', it.xpath('.//p/text()').get()).group(0)),
}

# 先睡眠下再发
time.sleep((random.random() * self.max_time) + self.min_time)
# 跳转到页面
yield scrapy.Request(
response.urljoin(item["href_url"]),
self.parse02,
meta={'item': item}
)

# 找到下一页地址
next_url = response.xpath('//button[@class="btn-next"]/../@href').get()
# 要先判断非空
if next_url is not None:
print('=========================下一页========================')
yield scrapy.Request(
response.urljoin(next_url),
# 这个回调可以设置成其它的函数,不一定是要调用自己
self.parse
)

def parse02(self, response):
# 可以直接取得之前就获取的信息
item = response.meta['item']

# 取得电影说明:
drama = re.search('[^\\n\s].?[^\\n]*',
response.xpath('//div[@class="el-card__body"]//div[@class="drama"]/p/text()').get()).group(0)
# 导演列表
directors = response.xpath(
'//div[@class="directors el-row"]//div[@class="el-card is-hover-shadow"]/div[@class="el-card__body"]')
directors_list = []
for ji in directors:
directors_list.append({
'director': ji.xpath('./p/text()').get(),
'director_img_url': ji.xpath('./img/@src').get()
})

# 演员列表
actors = response.xpath('//div[@class="actor el-col el-col-4"]')
actors_list = []
for actor in actors:
actors_list.append({
'actor_name': actor.xpath('.//p[1]/text()').get(),
'actor_image_url': actor.xpath('.//img/@src').get(),
'actor_role': re.search('[^饰:].*', actor.xpath('.//p[2]/text()').get()).group(0)
})
# 剧照地址
movie_poster = response.xpath('//div[@class="photos el-row"]')
movie_poster_list = []
for it in movie_poster:
movie_poster_list.append(it.xpath('.//img/@src').get())

yield ({
'item': item,
'directors_list': directors_list,
'drama': drama,
'actors_list': actors_list,
'movie_poster_list': movie_poster_list
})


if __name__ == '__main__':
from scrapy import cmdline

cmdline.execute("scrapy crawl study01".split())

然后再把读取的数据通过 pipelines 保存到 Json 文件里

import json


# 可以有多个 Pipeline,执行顺序是在设置文件里更改
class TempPipeline:
# 给文件追加写入的权限
f = open("./temp.json", "ta", encoding='UTF-8')

# 一个管道类有以下三个生命周期函数
def open_spider(self, spider):
print("当前开启的爬虫为:", spider)

def process_item(self, item, spider):
# 写入 json 文件
self.f.write(json.dumps(item, ensure_ascii=False) + ',\n')
return item

def close_spider(self, spider):
# 在这里关闭,避免频繁的开关文件
print("爬虫%s关闭!" % spider)

基本例子

# -*- coding:utf-8 -*-
import scrapy

# 创建爬虫类,并且继承 scrapy.Spider
class TestspiderSpider(scrapy.Spider):
name = 'testSpider'
# 这里就爬取自己的博客吧
allowed_domains = ['alsritter.icu'] # 允许爬取的范围,防止爬虫爬到了别的网站
start_urls = ['http://alsritter.icu/']
# 先赋初值为 0
page_count = 0
# 当前页数
next_page = 1

def parse(self, response):
div = response.xpath('//div[@class="recent-post-info"]')
for li in div:
item = {'title': li.xpath('./a[@class="article-title"]/text()').get(),
'time': li.xpath('.//span[@class="post-meta__date-updated"]/text()').get()}
# 把这个结果返回给 pipelines,使之处理这个结果
yield item

# 翻页
# 取得页数(第一次才需要获取)
if self.page_count == 0:
self.page_count = int(response.xpath('//div[@class="pagination"]/a[last()-1]/text()').get())

if self.next_page < self.page_count:
self.next_page = self.next_page + 1
next_url = response.urljoin(f'/page/{self.next_page}/')
print(next_url)
# 发出请求
yield scrapy.Request(next_url, self.parse)
else:
print('================结束=================')

# 直接在这个脚本里设置启动,调用命令行包,执行命令
if __name__ == '__main__':
from scrapy import cmdline

cmdline.execute("scrapy crawl testSpider".split())

可以把 MySpider 的取得的值传递给 pipelines ,然后在 pipelines 里进行相应操作(例如持久化)

# pipelines.py 的内容
# 可以有多个 Pipeline,执行顺序是在设置文件里更改
class TempspiderPipeline:
# 拿到 TestspiderSpider 传过来的 item 进行相应处理(例如这里插入数据)
def process_item(self, item, spider):
# 这个 Pipeline 用来加多一个字段
item['temp'] = '这个是测试插入的数据'
# 如果要传递给下一个 Pipeline 这里必须 return
return item


class TempspiderPipeline02:
def process_item(self, item, spider):
print(item)
return item

先开启 pipelines 使之能取得 MySpider 返回的 item 还是修改 setting.py 把这个 ITEM_PIPELINES 注释删掉

ITEM_PIPELINES = {
# TempspiderPipeline 表示要启动的类名,这个 300是一个优先级
# 顺序是从小到大
'TempSpider.pipelines.TempspiderPipeline': 300,
'TempSpider.pipelines.TempspiderPipeline02': 301,
}

可以加上一个随机访问时间:

import time
import random

...
max_time = 5.0
min_time = 1.3

def parse(self, response):
# 先睡眠 1秒再发一次请求(单位是秒)
time.sleep((random.random() * self.max_time) + self.min_time)
...

Reference